package cn.xing.modules.sys.service.impl;

import cn.xing.common.constant.SocketIOConstans;
import cn.xing.modules.sys.service.SocketIOService;
import cn.xing.modules.sys.service.SysConfigService;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOServer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import static cn.xing.common.constant.SocketIOConstans.NEW_PAYED_ORDER;
import static cn.xing.common.constant.SysConfigConstans.SYS_AUDIO_REPORT;

@Slf4j
@Service(value = "socketIOService")
public class SocketIOServiceImpl implements SocketIOService {
    @Autowired
    private SocketIOServer socketIOServer;
    @Autowired
    private SysConfigService sysConfigService;

    /**
     * 存放已连接的客户端
     */
    private static Map<String, List<SocketIOClient>> clientMap = new ConcurrentHashMap<>();

    /**
     * Spring IoC容器创建之后，在加载SocketIOServiceImpl Bean之后启动
     */
    @PostConstruct
    private void autoStartup() {
        start();
    }

    /**
     * Spring IoC容器在销毁SocketIOServiceImpl Bean之前关闭,避免重启项目服务端口占用问题
     */
    @PreDestroy
    private void autoStop() {
        stop();
    }

    @Override
    public void start() {
        // 监听客户端连接
        socketIOServer.addConnectListener(client -> {
            log.debug("************ 客户端： " + getIpByClient(client) + " 已连接 ************");
            // 自定义事件`connected` -> 与客户端通信  （也可以使用内置事件，如：Socket.EVENT_CONNECT）
            client.sendEvent("connected", "你成功连接上了哦...");

        });

        // 监听客户端断开连接
        socketIOServer.addDisconnectListener(client -> {
            String clientIp = getIpByClient(client);
            log.debug(clientIp + " *********************** " + "客户端已断开连接");
            String userId = getUserIdByClient(client);
            if (userId != null) {
                List<SocketIOClient> userClientList = clientMap.getOrDefault(userId, new ArrayList<>());

                List<SocketIOClient> collect = userClientList.stream().filter((item) -> {
                    return !client.getSessionId().toString().equals(item.getSessionId().toString());
                }).collect(Collectors.toList());

                // 如果删除后，没有连接的客户端，则删除该用户记录
                if (collect.isEmpty()) {
                    clientMap.remove(userId);
                } else {
                    clientMap.put(userId, collect);
                }
                client.disconnect();
            }
        });


        // 登录->监听客户端消息
        // 客户端推送事件时，onData接受数据，这里是string类型的json数据，还可以为Byte[],object其他类型
        socketIOServer.addEventListener(SocketIOConstans.LOGIN_EVENT, String.class, (client, data, ackSender) -> {
            String clientIp = getIpByClient(client);
            String userId = data;
            log.debug(clientIp + " ************ 客户端登录：userId：" + userId);

            if (StringUtils.isNotBlank(userId) && Integer.parseInt(userId) > 0) {
                List<SocketIOClient> userClientList = clientMap.getOrDefault(userId, new ArrayList<>());

                // 如果不包含当前 client
                if (!userClientList.contains(client)) {
                    userClientList.add(client);
                    clientMap.put(userId, userClientList);
                }

            }

            log.info("目前客户：[{}]", clientMap);
            Collection<SocketIOClient> clients = socketIOServer.getBroadcastOperations().getClients();
            log.info("所有客户端，[{}]", clients);
        });


        // 启动服务
        socketIOServer.start();

        // broadcast: 默认是向所有的socket连接进行广播，但是不包括发送者自身，如果自己也打算接收消息的话，需要给自己单独发送。
        // new Thread(() -> {
        //     int i = 0;
        //     while (true) {
        //         try {
        //             // 每3秒发送一次广播消息
        //             Thread.sleep(6000);
        //             socketIOServer.getBroadcastOperations().sendEvent(NEW_PAYED_ORDER, "1");
        //
        //             log.info(clientMap.toString());
        //         } catch (InterruptedException e) {
        //             e.printStackTrace();
        //         }
        //     }
        // }).start();
    }

    /**
     * 根据客户端 获取用户id
     *
     * @param client
     * @return
     */
    private String getUserIdByClient(SocketIOClient client) {
        Set<String> userIds = clientMap.keySet();
        for (String userId : userIds) {
            List<SocketIOClient> socketIOClients = clientMap.get(userId);

            for (int i = 0; i < socketIOClients.size(); i++) {
                if (client.getSessionId().toString().equals(socketIOClients.get(i).getSessionId().toString())) {
                    return userId;
                }
            }
        }
        return null;
    }

    @Override
    public void stop() {
        if (socketIOServer != null) {
            socketIOServer.stop();
            socketIOServer = null;
        }
    }

    @Override
    public void pushMessageToUser(String userId, String eventKey, String msgContent) {
        List<SocketIOClient> socketIOClients = clientMap.get(userId);
        for (SocketIOClient client : socketIOClients) {
            client.sendEvent(eventKey, msgContent);
        }


    }

    @Override
    public void broadCastExcludeUser(List<String> userIdList, String eventKey, String msgContent) {
        Set<String> userIds = clientMap.keySet();
        // 排除
        userIds.removeAll(userIdList);
        for (String userId : userIds) {
            List<SocketIOClient> socketIOClients = clientMap.get(userId);

            for (int i = 0; i < socketIOClients.size(); i++) {
                socketIOClients.get(i).sendEvent(eventKey, msgContent);
            }
        }
    }

    @Override
    public void broadCastExcludeUser(String userId, String eventKey, String msgContent) {
        // socketIOServer.getBroadcastOperations().sendEvent("myBroadcast", "广播消息 " + DateUtil.now());
        broadCastExcludeUser(Arrays.asList(userId), eventKey, msgContent);
    }

    @Override
    public void broadCast(String eventKey, String msgContent) {
        socketIOServer.getBroadcastOperations().sendEvent(eventKey, msgContent);

    }

    @Override
    public void newOrderBroadCast() {
        String sysAudioReport = sysConfigService.getValue(SYS_AUDIO_REPORT);
        // 1 语音播报
        broadCast(NEW_PAYED_ORDER, sysAudioReport);
    }


    /**
     * 获取连接的客户端ip地址
     *
     * @param client: 客户端
     * @return: java.lang.String
     */
    private String getIpByClient(SocketIOClient client) {
        String sa = client.getRemoteAddress().toString();
        String clientIp = sa.substring(1, sa.indexOf(":"));
        return clientIp;
    }


}
